import json
import requests
import datetime as dt
from IPython.display import display, Math, Latex, Image
import matplotlib.pyplot as plt
from matplotlib.dates import date2num, num2date
from matplotlib import dates as mdates
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.offline as pyo
from scipy.optimize import curve_fit, brentq
from scipy import stats as sps
from scipy.interpolate import interp1d
from scipy.signal import argrelextrema
import numpy as np
import pandas as pd
pyo.init_notebook_mode()
Stima downloads dell'app Immuni (vedere link per il metodo utilizzato).
Image(filename='immuni.png')
info@maxpierini.it
In this jupyter notebook, we'll analyse italian COVID-19 data gathered from Dipartimento di Protezione Civile GitHub repository (link).
We'll consider following compartments

Daily variations
Percentages & Rates
Extra
TEST
Other plots on this website:
Also on this website:
def gauss_func(x, a, b, c):
d = - ((x - b) ** 2)
return a * np.exp( d / (c ** 2) )
def double_logistic(x, a1, b1, k1, a2, b2, k2):
d1 = k1 * (b1 - np.array(x))
l1 = a1 / (1 + np.exp(d1))
d2 = k2 * (b2 - np.array(x))
l2 = (a2 - a1) / (1 + np.exp(d2))
return l1 + l2 + 0
def double_gompertz_function(x, a1, b1, k1, a2, b2, k2, e):
exp1 = - np.exp(k1 * (b1 - x))
g1 = a1 * np.exp(exp1)
exp2 = - np.exp(k2 * (b2 - x))
g2 = (a2 - a1) * np.exp(exp2)
return g1 + g2 + e
def gompertz_function(x, a, b, k, e):
exp = - np.exp(k * (b - x))
g = a * np.exp(exp)
return g + e
def logistic_func(x, a1, b1, k1, e):
d1 = k1 * (b1 - np.array(x))
l1 = a1 / (1 + np.exp(d1))
return l1 + e
def logistic_func_no_e(x, a1, b1, k1):
d1 = k1 * (b1 - np.array(x))
l1 = a1 / (1 + np.exp(d1))
return l1
and get the most recently updated data from D.P.C. GitHub Repository, loading them into a dictionary.
italy = None
url = 'https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-andamento-nazionale/dpc-covid19-ita-andamento-nazionale.csv'
italy = pd.read_csv(
url,
usecols=[
'data',
'totale_casi', 'totale_positivi',
'nuovi_positivi', 'variazione_totale_positivi',
'deceduti', 'dimessi_guariti',
'isolamento_domiciliare', 'ricoverati_con_sintomi', 'terapia_intensiva'
],
parse_dates=['data'],
index_col=['data'],
squeeze=True).sort_index()
Let's check the dates of the downloaded data:
print(f"FIRST ENTRY DATE: {italy.index[0]}")
print(f"LAST ENTRY DATE: {italy.index[-1]}")
period = (
italy.index[-1] -
italy.index[0]
).days
print("COVERAGE: {} days".format(period))
print("CURRENT DATE IS: {}".format(dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
fig = make_subplots(specs=[[{"secondary_y": True}]])
fig.add_trace(
go.Bar(
name="Severe", x=italy.index, y=italy["terapia_intensiva"], marker_color="red"
), secondary_y=False,
)
fig.add_trace(
go.Bar(
name="Mild", x=italy.index, y=italy["ricoverati_con_sintomi"], marker_color="orange"
), secondary_y=False,
)
fig.add_trace(
go.Bar(
name="Quarantine", x=italy.index, y=italy["isolamento_domiciliare"], marker_color="yellow"
), secondary_y=False,
)
fig.add_trace(
go.Bar(
name="Recovered", x=italy.index, y=italy["dimessi_guariti"], marker_color="lime"
), secondary_y=False,
)
fig.add_trace(
go.Bar(
name="Deaths", x=italy.index, y=italy["deceduti"], marker_color="grey"
), secondary_y=False,
)
fig.add_trace(
go.Scatter(
name="Infected", x=italy.index, y=italy["totale_positivi"], marker_color="lightskyblue", line_shape='spline',
)
)
fig.add_trace(
go.Scatter(
name="Total", x=italy.index, y=italy["totale_casi"], marker_color="blue", line_shape='spline',
), secondary_y=False,
)
fig.update_layout(
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY overview", "xanchor": "center", "x": 0.5},
hovermode="x unified", barmode='stack'
)
fig.update_yaxes(title_text="number", secondary_y=False, gridcolor='#bdbdbd')
fig.update_yaxes(title_text="new/day", zerolinecolor='#969696', secondary_y=True)
# find and fix outliers using Hampel filter
# Impl from: https://towardsdatascience.com/outlier-detection-with-hampel-filter-85ddf523c73d
def hampel_filter_pandas(input_series, window_size, n_sigmas=3.0):
k = 1.4826 # scale factor for Gaussian distribution
new_series = input_series.copy()
# helper lambda function
MAD = lambda x: np.median(np.abs(x - np.median(x)))
# the use of min_periods is to have rolling window extend towards
# the end of the data series; in effect, we can apply hampel filter
# to most recent observations
# taken from: https://stackoverflow.com/questions/48953313/pandas-rolling-window-boundary-on-start-end-of-series/48953314#48953314
rolling_window_size = 2*window_size+1
rolling_median = input_series.rolling(
window=rolling_window_size,
min_periods=(rolling_window_size//2),
center=True).median()
rolling_mad = k * input_series.rolling(
window=rolling_window_size,
min_periods=(rolling_window_size//2),
center=True).apply(MAD)
# print(f'rolling_mad = {rolling_mad}, rolling_median = {rolling_median}')
diff = np.abs(input_series - rolling_median)
where = diff > (n_sigmas * rolling_mad)
indices = np.argwhere(where.to_numpy()).flatten()
new_series[indices] = rolling_median[indices]
return new_series, indices
def hampel_filter_dataframe(dataframe, window, sigmas):
framedict = {'data': dataframe.index}
for col in dataframe.columns:
filtered, _ = hampel_filter_pandas(dataframe[col], window, sigmas)
framedict.update({col: filtered})
newframe = pd.DataFrame(framedict)
newframe.set_index('data', inplace=True)
return newframe
filtered_italy = hampel_filter_dataframe(italy, 7, 2)
smoothed_italy = filtered_italy.rolling(14,
win_type='gaussian',
min_periods=1,
center=True).mean(std=5).round()
fig = go.Figure(data=go.Scatter(
x=italy.index, y=italy["nuovi_positivi"],
mode='lines+markers',
marker_color="lightgrey", marker_size=5, marker_symbol="circle", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="new cases/day"
))
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["nuovi_positivi"],
line_color="blue",
legendgroup="smoothed", name="smoothed"
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (new cases/day)", "xanchor": "center", "x": 0.5},
yaxis_title="new cases/day", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure(data=go.Scatter(
x=italy.index, y=italy["deceduti"].diff(),
mode='lines+markers',
marker_color="grey", marker_size=5, marker_symbol="diamond", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="new deaths/day"
))
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["deceduti"].diff(),
line_color="red",
legendgroup="smoothed", name="smoothed"
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (new deaths/day)", "xanchor": "center", "x": 0.5},
yaxis_title="new deaths/day", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure(data=go.Scatter(
x=italy.index, y=italy["dimessi_guariti"].diff(),
mode='lines+markers',
marker_color="grey", marker_size=5, marker_symbol="square", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="new recovered/day"
))
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["dimessi_guariti"].diff(),
line_color="lime",
legendgroup="smoothed", name="smoothed"
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (new recovered/day)", "xanchor": "center", "x": 0.5},
yaxis_title="new recovered/day", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure(data=go.Scatter(
x=italy.index, y=italy["variazione_totale_positivi"],
mode='lines+markers',
marker_color="grey", marker_size=5, marker_symbol="x", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="new infected/day"
))
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["variazione_totale_positivi"],
line_color="lightskyblue",
legendgroup="smoothed", name="smoothed"
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (new infected/day)", "xanchor": "center", "x": 0.5},
yaxis_title="new infected/day", hovermode="x"
)
pyo.iplot(fig)
And finally take a look to the complete result, hiding original data, flexes and vertical lines:
fig = go.Figure()
# CONFIRMED
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["nuovi_positivi"],
line_color="blue",
legendgroup="cases", name="cases"
)
)
# DEATHS
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["deceduti"].diff(),
line_color="red",
legendgroup="deaths", name="deaths"
)
)
# RECOVERD
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["dimessi_guariti"].diff(),
line_color="lime",
legendgroup="recovered", name="recovered"
)
)
# INFECTED
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["variazione_totale_positivi"],
line_color="lightskyblue",
legendgroup="infected", name="infected"
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (new/day)", "xanchor": "center", "x": 0.5},
yaxis_title="new/day", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["totale_casi"],
line_color="blue",
name="total cases"
)
)
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["deceduti"],
line_color="red",
name="deaths"
)
)
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["dimessi_guariti"],
line_color="green",
name="recovered"
)
)
fig.add_trace(
go.Scatter(
x=smoothed_italy.index, y=smoothed_italy["totale_positivi"],
line_color="lightskyblue",
name="infected"
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (cases)", "xanchor": "center", "x": 0.5},
yaxis_title="cases", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure(data=go.Scatter(
x=italy.index, y=italy["totale_casi"].pct_change(),
mode='lines+markers',
marker_color="blue", marker_size=5, marker_symbol="circle", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="cases"
))
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696', "tickformat": '+,.2%',},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY percentage variation (cases)", "xanchor": "center", "x": 0.5},
yaxis_title="cases", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure(data=go.Scatter(
x=italy.index, y=italy["deceduti"].pct_change(),
mode='lines+markers',
marker_color="red", marker_size=5, marker_symbol="diamond", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="deaths"
))
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696', "tickformat": '+,.2%',},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY percentage variation (deaths)", "xanchor": "center", "x": 0.5},
yaxis_title="deaths", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure(data=go.Scatter(
x=italy.index[4:], y=italy["dimessi_guariti"][4:].pct_change(),
mode='lines+markers',
marker_color="lime", marker_size=5, marker_symbol="square", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="recovered"
))
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696', "tickformat": '+,.2%',},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY percentage variation (recovered)", "xanchor": "center", "x": 0.5},
yaxis_title="recovered", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure(data=go.Scatter(
x=italy.index, y=italy["totale_positivi"].pct_change(),
mode='lines+markers',
marker_color="lightskyblue", marker_size=5, marker_symbol="x", marker_line_width=1,
line={"dash": "dot"}, line_shape='spline',
name="infected"
))
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696', "tickformat": '+,.2%',},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY percentage variation (infected)", "xanchor": "center", "x": 0.5},
yaxis_title="infected", hovermode="x"
)
pyo.iplot(fig)
!!! PLEASE NOTE !!!
These rates are only useful for SIRD epidemiological model (read here for details) not to define COVID-19 actual rates.
fig = go.Figure()
fig.add_trace(go.Scatter(
x=italy.index, y=italy["deceduti"]/italy["totale_casi"],
mode='lines+markers',
marker_size=3, marker_symbol="cross",
line_shape='spline',
name="mortality rate",
marker_color="red",
hovertemplate="mortality rate<br>%{y:.2%}"
))
fig.add_trace(go.Scatter(
x=italy.index, y=italy["dimessi_guariti"]/italy["totale_casi"],
mode='lines+markers',
marker_size=3, marker_symbol="diamond",
line_shape='spline',
name="recovery rate",
marker_color="lime",
hovertemplate="recovery rate<br>%{y:.2%}"
))
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696', "tickformat": ',.0%',},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY rates", "xanchor": "center", "x": 0.5},
yaxis_title="percentage", hovermode="x"
)
pyo.iplot(fig)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=italy.index, y=italy["ricoverati_con_sintomi"],
mode='lines+markers',
marker_size=3, marker_symbol="circle",
line_shape='spline',
name="with symptoms"
))
fig.add_trace(go.Scatter(
x=italy.index, y=italy["terapia_intensiva"],
mode='lines+markers',
marker_size=3, marker_symbol="circle",
line_shape='spline',
name="emergency"
))
fig.add_trace(go.Scatter(
x=italy.index, y=italy["isolamento_domiciliare"],
mode='lines+markers',
marker_size=3, marker_symbol="circle",
line_shape='spline',
name="home"
))
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (current hospitalized)", "xanchor": "center", "x": 0.5},
yaxis_title="number", hovermode="x"
)
pyo.iplot(fig)
google = pd.read_csv(
"https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv",
usecols=[
"date", "country_region_code", "country_region", "sub_region_1", "sub_region_2",
"retail_and_recreation_percent_change_from_baseline",
"grocery_and_pharmacy_percent_change_from_baseline",
"parks_percent_change_from_baseline",
"transit_stations_percent_change_from_baseline",
"workplaces_percent_change_from_baseline",
"residential_percent_change_from_baseline"
],
parse_dates=['date'], dtype={"sub_region_1": str, "sub_region_2": str},
index_col=["date"]
)
google.to_pickle("google-mobility.pkl")
ITALY = google.loc[google["country_region_code"] == "IT"]
National = ITALY.loc[ITALY.fillna("NONE")["sub_region_1"] == "NONE"]
fig = go.Figure()
for column in National.columns[4:]:
fig.add_trace(
go.Scatter(
x=National.index,
y=National[column],
name=column.replace("_", " ").title().split(" Percent")[0]
)
)
fig.add_trace(
go.Scatter(
x=[pd.Timestamp("2020-03-11"), pd.Timestamp("2020-03-11")], y=[-100, 100],
showlegend=False, name="lockdown", mode="lines",
line=dict(color='black', width=2, dash='dot'),
)
)
fig.add_trace(
go.Scatter(
x=[pd.Timestamp("2020-05-04"), pd.Timestamp("2020-05-04")], y=[-100, 100],
showlegend=False, name="1st relax", mode="lines",
line=dict(color='black', width=2, dash='dot'),
)
)
fig.add_trace(
go.Scatter(
x=[
pd.Timestamp("2020-05-04"), pd.Timestamp("2020-05-04"),
pd.Timestamp("2020-03-11"), pd.Timestamp("2020-03-11")],
y=[-100, 100, 100, -100],
fill="toself", fillcolor="rgba(125,125,125,.05)",
showlegend=False, name="lockdown \U00000394",
line_color="rgba(0,0,0,0)", mode="lines+text",
text=["Lockdown end", "Lockdown end", "Lockdown start", "Lockdown start"]
)
)
fig.update_layout(
legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696'},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": f"Italy mobility (last update {National.index[-1].date()})", "xanchor": "center", "x": 0.5},
yaxis_title="Percentage Change from Baseline", hovermode="x unified"
)
pyo.iplot(fig)
If the number of Infected $\mathbf{I} \ll \mathbf{N}$ total population (in the SIR model, $\mathbf{S}$usceptible $\rightarrow \mathbf{I}$nfected $ \rightarrow \mathbf{R}$emoved), the basic reproduction number $R_0$ can be calculated as
$$ R_0 \simeq \frac{\beta}{\gamma} $$and knowing that
$$ d \mathbf{R} = \gamma \mathbf{I} $$$$ d \mathbf{I} = (\beta - \gamma)\mathbf{I} $$we find that
$$ \gamma = \frac{ d \mathbf{R} }{ \mathbf{I} }$$$$ \beta = \frac{d \mathbf{I}}{\mathbf{I}} + \gamma = \frac{d \mathbf{I} + d \mathbf{R}}{\mathbf{I}} $$so
$$ R_0 \simeq \frac{ \frac{d \mathbf{I} + d \mathbf{R}}{\mathbf{I}} }{ \frac{ d \mathbf{R} }{ \mathbf{I} } } = \frac{d \mathbf{I} + d \mathbf{R}}{d \mathbf{R}} = \frac{d \mathbf{I}}{d \mathbf{R}} + 1$$thus since
$$ d \mathbf{R} \geq 0 $$we can say that
$$ d \mathbf{I} > 0 \Rightarrow R_0 > 1 $$$$ d \mathbf{I} = 0 \Rightarrow R_0 = 1 $$$$ d \mathbf{I} < 0 \Rightarrow R_0 < 1 $$So when the daily number of new infected is greater than zero, $R_0$ is greater than 1 and could be a sign of an out-of-control epidemy.
When calculated with simplified methods like this (and using ordinary differential equations) $R_0$ is, in fact, simply a threshold, not the average number of secondary infections and can only determine whether an epidemy will die out ($R_0 < 1$) or not ($R_0 > 1$).
Let's try to calculate $R_0$ for Italy, using the iterpolated and smoothed data (to smooth daily fluctuations):
Recovered + Deaths data Infected daily variation generated by the sum of daily variation data for Confirmed, Deaths and Recovered Infected curve generated by the sum of Confirmed, Deaths and Recovereddef smooth(y):
ydf = pd.DataFrame(
data={
"y": y
}
)
ydf_smoothed = ydf.rolling(7,
win_type='gaussian',
min_periods=1,
center=True).mean(std=2).round()
return ydf_smoothed
dI = italy["variazione_totale_positivi"]
dR = italy["dimessi_guariti"].diff()
dD = italy["deceduti"].diff()
sdI = smoothed_italy["variazione_totale_positivi"]
sdR = smoothed_italy["dimessi_guariti"].diff()
sdD = smoothed_italy["deceduti"].diff()
R0 = dI / (dR + dD) + 1
sR0 = sdI / (sdR + sdD) + 1
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=italy.index[1:],
y=R0[1:],
marker=dict(
size=6,
line=dict(width=1, color="black"),
color=R0[1:],
cmin=0,
cmax=6,
colorbar=dict(
title="R<sub>t</sub>"
),
colorscale=[
[0., "rgba(0,150,0,1)"],
[1./6, "rgba(255, 255, 255, 1)"],
[1., "rgba(255,0,0,1)"],
]
),
mode="markers+lines",
line = dict(color='grey', width=2, dash='dot'),
name="R<sub>0</sub>",
))
fig.add_trace(
go.Scatter(
x=italy.index[1:],
y=sR0[1:],
marker=dict(
size=6,
line=dict(width=1, color="black"),
color=R0[1:],
cmin=0,
cmax=6,
colorbar=dict(
title="R<sub>t</sub>"
),
colorscale=[
[0., "rgba(0,150,0,1)"],
[1./6, "rgba(255, 255, 255, 1)"],
[1., "rgba(255,0,0,1)"],
]
),
mode="lines",
line = dict(color='blue', width=4),
name="R<sub>0</sub> smooth",
))
fig.add_trace(
go.Scatter(
x=[italy.index[1], italy.index[-1]], y=[1, 1],
#fill="tozeroy",
#fillcolor="rgba(0, 250, 250, .15)",
mode="lines",
line_color="rgba(0, 0, 0, .5)",
name="R<sub>0</sub> < 1", showlegend=False
)
)
fig.add_trace(
go.Scatter(
x=[pd.Timestamp("2020-03-11"), pd.Timestamp("2020-03-11")], y=[-100, 100],
showlegend=False, name="lockdown", mode="lines",
line=dict(color='black', width=2, dash='dot'),
)
)
fig.add_trace(
go.Scatter(
x=[pd.Timestamp("2020-05-04"), pd.Timestamp("2020-05-04")], y=[-100, 100],
showlegend=False, name="1st relax", mode="lines",
line=dict(color='black', width=2, dash='dot'),
)
)
fig.add_trace(
go.Scatter(
x=[
pd.Timestamp("2020-05-04"), pd.Timestamp("2020-05-04"),
pd.Timestamp("2020-03-11"), pd.Timestamp("2020-03-11")],
y=[-100, 100, 100, -100],
fill="toself", fillcolor="rgba(125,125,125,.05)",
showlegend=False, name="lockdown \U00000394",
line_color="rgba(0,0,0,0)", mode="lines",
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696', "range": [0, 15]},
xaxis={"gridcolor": '#bdbdbd'},
title={"text": "ITALY (R<sub>0</sub>)", "xanchor": "center", "x": 0.5},
yaxis_title="$R_0$", hovermode="x unified"
)
pyo.iplot(fig)
Italy $R_t$ (effective reproduction number in time $t$) calculated with Bayesian approach (Bettencourt & Ribeiro 2008 and Kevin Systrom 2020).
The vertical dashed line is the day of the national lockdown (dashed) and first lockdown relax (dotted).
Method: italian (PDF)
_ = """italy = None
url = 'https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-andamento-nazionale/dpc-covid19-ita-andamento-nazionale.csv'
italy = pd.read_csv(url,
usecols=['data', 'nuovi_positivi'],
parse_dates=['data'],
index_col=['data'],
squeeze=True).sort_index()"""
# We create an array for every possible value of Rt
R_T_MAX = -6
R_T_MAX = 6
r_t_range = np.linspace(0, R_T_MAX, R_T_MAX*100+1)
# Gamma is 1/serial interval
# https://wwwnc.cdc.gov/eid/article/26/7/20-0282_article
# https://www.nejm.org/doi/full/10.1056/NEJMoa2001316
GAMMA = 1/7.5
def prepare_cases(cases, cutoff=1, filter_win=7, filter_std=2, smooth_win=14, smooth_std=5):
cases[cases < 0] = 0
cases, cases_outliers = hampel_filter_pandas(cases, filter_win, filter_std)
smoothed = cases.rolling(smooth_win,
win_type='gaussian',
min_periods=1,
center=True).mean(std=smooth_std).round()
smoothed[smoothed < 0] = 0
idx_start = np.searchsorted(smoothed, cutoff)
smoothed = smoothed.iloc[idx_start:]
original = cases.loc[smoothed.index]
return original, smoothed
def highest_density_interval(pmf, p=.95):
# If we pass a DataFrame, just call this recursively on the columns
if(isinstance(pmf, pd.DataFrame)):
return pd.DataFrame([highest_density_interval(pmf[col], p=p) for col in pmf],
index=pmf.columns)
cumsum = np.cumsum(pmf.values)
# N x N matrix of total probability mass for each low, high
total_p = cumsum - cumsum[:, None]
# Return all indices with total_p > p
lows, highs = np.array([]), np.array([])
j = 0
while not lows.size or not highs.size:
lows, highs = (total_p > (p-j)).nonzero()
j += .05
try:
# Find the smallest range (highest density)
best = (highs - lows).argmin()
except Exception as e:
print("ERR {}".format(e))
best = 0
low = pmf.index[lows[best]]
high = pmf.index[highs[best]]
return pd.Series([low, high],
index=[f'Low_{p*100:.0f}',
f'High_{p*100:.0f}'])
def HDI_of_grid(probMassVec, credMass=0.95):
sortedProbMass = np.sort(probMassVec, axis=None)[::-1]
HDIheightIdx = np.min(np.where(np.cumsum(sortedProbMass) >= credMass))
HDIheight = sortedProbMass[HDIheightIdx]
HDImass = np.sum(probMassVec[probMassVec >= HDIheight])
idx = np.where(probMassVec >= HDIheight)[0]
return {'indexes':idx, 'mass':HDImass, 'height':HDIheight}
def HDI_of_grid_from_df(pmf, p):
# If we pass a DataFrame, just call this recursively on the columns
if(isinstance(pmf, pd.DataFrame)):
return pd.DataFrame([HDI_of_grid_from_df(pmf[col], p=p) for col in pmf],
index=pmf.columns)
res = HDI_of_grid(pmf, p)
#print(res["indexes"])
lo_idx = res["indexes"][0]
hi_idx = res["indexes"][-1]
lo = pmf.index[lo_idx]
hi = pmf.index[hi_idx]
return pd.Series([lo, hi],
index=[f'Low_{p*100:.0f}',
f'High_{p*100:.0f}'])
def HDIs(pmf, P=[.95, .5]):
RES = []
for p in P:
res = HDI_of_grid_from_df(pmf, p=p)
RES.append(res)
return RES
def HDI_of_grid(probMassVec, credMass=0.95):
sortedProbMass = np.sort(probMassVec, axis=None)[::-1]
HDIheightIdx = np.min(np.where(np.cumsum(sortedProbMass) >= credMass))
HDIheight = sortedProbMass[HDIheightIdx]
HDImass = np.sum(probMassVec[probMassVec >= HDIheight])
idx = np.where(probMassVec >= HDIheight)[0]
return {'indexes':idx, 'mass':HDImass, 'height':HDIheight}
def HDI_of_grid_from_df(pmf, p):
# If we pass a DataFrame, just call this recursively on the columns
if(isinstance(pmf, pd.DataFrame)):
return pd.DataFrame([HDI_of_grid_from_df(pmf[col], p=p) for col in pmf],
index=pmf.columns)
res = HDI_of_grid(pmf, p)
#print(res["indexes"])
lo_idx = res["indexes"][0]
hi_idx = res["indexes"][-1]
lo = pmf.index[lo_idx]
hi = pmf.index[hi_idx]
return pd.Series([lo, hi],
index=[f'Low_{p*100:.0f}',
f'High_{p*100:.0f}'])
def get_posteriors(sr, sigma=0.15):
# (1) Calculate Lambda
lam = sr[:-1].values * np.exp(GAMMA * (r_t_range[:, None] - 1))
# (2) Calculate each day's likelihood
likelihoods = pd.DataFrame(
data = sps.poisson.pmf(sr[1:].values, lam),
index = r_t_range,
columns = sr.index[1:])
# (3) Create the Matrix
process_matrix = sps.norm(loc=r_t_range,
scale=sigma
).pdf(r_t_range[:, None])
# (3a) Normalize all rows to sum to 1
process_matrix /= process_matrix.sum(axis=0)
# (4) Calculate the initial prior
prior0 = sps.gamma(a=4).pdf(r_t_range)
prior0 /= prior0.sum()
# Create a DataFrame that will hold our posteriors for each day
# Insert our prior as the first posterior.
posteriors = pd.DataFrame(
index=r_t_range,
columns=sr.index,
data={sr.index[0]: prior0}
)
# We said we'd keep track of the sum of the log of the probability
# of the data for maximum likelihood calculation.
log_likelihood = 0.0
# (5) Iteratively apply Bayes' rule
for previous_day, current_day in zip(sr.index[:-1], sr.index[1:]):
#(5a) Calculate the new prior
current_prior = process_matrix @ posteriors[previous_day]
#(5b) Calculate the numerator of Bayes' Rule: P(k|R_t)P(R_t)
numerator = likelihoods[current_day] * current_prior
#(5c) Calcluate the denominator of Bayes' Rule P(k)
denominator = np.sum(numerator)
# Execute full Bayes' Rule
posteriors[current_day] = numerator/denominator
# Add to the running sum of log likelihoods
log_likelihood += np.log(denominator)
return posteriors, log_likelihood
def plotly_rt(result, name):
index = result['ML'].index.get_level_values('data')
values = result['ML'].values
# Aesthetically, extrapolate credible interval by 1 day either side
lofn = interp1d(date2num(index),
result['Low_95'].values,
bounds_error=False,
fill_value='extrapolate')
hifn = interp1d(date2num(index),
result['High_95'].values,
bounds_error=False,
fill_value='extrapolate')
# Aesthetically, extrapolate credible interval by 1 day either side
lofn50 = interp1d(date2num(index),
result['Low_50'].values,
bounds_error=False,
fill_value='extrapolate')
hifn50 = interp1d(date2num(index),
result['High_50'].values,
bounds_error=False,
fill_value='extrapolate')
extended = pd.date_range(start=index[0], end=index[-1]+pd.Timedelta(days=1))
hyperextended = pd.date_range(start=index[0]-pd.Timedelta(days=1), end=index[-1]+pd.Timedelta(days=2))
fig = go.Figure()
fig.add_trace(
go.Scatter(
x=[index[0]-pd.Timedelta(days=10), index[-1]+pd.Timedelta(days=10)], y=[1, 1],
line = dict(color='black', width=1), showlegend=False,
)
)
fig.add_trace(
go.Scatter(
x=[pd.Timestamp("2020-03-11"), pd.Timestamp("2020-03-11")], y=[-100, 100],
showlegend=False, name="lockdown", mode="lines",
line=dict(color='black', width=2, dash='dot'),
)
)
fig.add_trace(
go.Scatter(
x=[pd.Timestamp("2020-05-04"), pd.Timestamp("2020-05-04")], y=[-100, 100],
showlegend=False, name="1st relax", mode="lines",
line=dict(color='black', width=2, dash='dot'),
)
)
fig.add_trace(
go.Scatter(
x=[
pd.Timestamp("2020-05-04"), pd.Timestamp("2020-05-04"),
pd.Timestamp("2020-03-11"), pd.Timestamp("2020-03-11")],
y=[0, 4, 4, 0],
fill="toself", fillcolor="rgba(125,125,125,.05)",
showlegend=False, name="lockdown \U00000394",
line_color="rgba(0,0,0,0)", mode="lines",
)
)
fig.add_trace(
go.Scatter(
x=index, y=lofn50(date2num(extended)),
line_color="rgba(0,0,0,.25)", showlegend=False,
name="lo50 R<sub>t</sub>"
)
)
fig.add_trace(
go.Scatter(
x=index, y=hifn50(date2num(extended)),
line_color="rgba(0,0,0,.25)", showlegend=False,
fill="tonexty", fillcolor="rgba(0,0,0,.1)",
name="hi50 R<sub>t</sub>"
)
)
fig.add_trace(
go.Scatter(
x=index, y=lofn(date2num(extended)),
line_color="rgba(0,0,0,.1)", showlegend=False,
name="lo95 R<sub>t</sub>"
)
)
fig.add_trace(
go.Scatter(
x=index, y=hifn(date2num(extended)),
line_color="rgba(0,0,0,.1)", showlegend=False,
fill="tonexty", fillcolor="rgba(0,0,0,.1)",
name="hi95 R<sub>t</sub>"
)
)
fig.add_trace(
go.Scatter(
x=index, y=values,
marker=dict(
size=5,
line=dict(width=1, color="black"),
color=values,
cmin=0,
cmax=max(values),
colorbar=dict(
title="R<sub>t</sub>"
),
colorscale=[
[0., "rgba(0,150,0,1)"],
[1./max(values), "rgba(255, 255, 255, 1)"],
[1., "rgba(255,0,0,1)"],
]
),
mode="markers+lines", showlegend=False,
line = dict(color='grey', width=2, dash='dot'),
name="R<sub>t</sub>",
)
)
fig.update_layout(legend_orientation="h",
showlegend=True, plot_bgcolor='rgba(0,0,0,0)',
yaxis={"gridcolor": '#bdbdbd', "zerolinecolor": '#969696', "range":[0, max(values)+1]},
xaxis={"gridcolor": '#bdbdbd', "range":[hyperextended[0], hyperextended[-1]]},
title={"text": "Real time {} R<sub>t</sub>".format(name), "xanchor": "center", "x": 0.5},
yaxis_title="$R_t$", hovermode="x unified"
)
pyo.iplot(fig)
return values[-1], result['High_95'].values[-1], result['Low_95'].values[-1], result
original, smoothed = prepare_cases(italy['nuovi_positivi'])
# Note that we're fixing sigma to a value just for the example
posteriors, log_likelihood = get_posteriors(smoothed, sigma=.25)
# Note that this takes a while to execute - it's not the most efficient algorithm
#hdis = highest_density_interval(posteriors)
HDIS = HDIs(posteriors)
most_likely = posteriors.idxmax().rename('ML')
result = most_likely
for hdis in HDIS:
result = pd.concat([result, hdis], axis=1)
R_mean, R_max, R_min, RT = plotly_rt(result, "ITALY")
__R0 = {"mean": R_mean, "max": R_max, "min": R_min}
with open("R0_italy.json", "w") as f:
json.dump(__R0, f)
Look in Italy Regions Overview to view $R_t$ for each region.
Effective reproduction number $R_t$ can be considered as an ODD: if for example $R_t=2$, means like in gambling that the likelihood of contagion is "given 2 to 1", so an infected will infect 2 susceptible subjects.
For the effect of the serial interval $\tau$ (onset of symptoms, diagnosis, etc), new cases $k_t$ observed in day $t$ have been infected in $t-\tau$ by the current new cases observed in $t-\tau$ (because old cases are supposed to be hospitalized, isolated, recovered or dead) so, with this simple method, in $t$ we can calculate the $R_{t-\tau}$.
$$ R_{t-\tau} = \frac{k_{t}}{k_{t-\tau}} $$Work in progress: distribute serial interval $\tau$ as Gamma and new cases $k_t$ as Poisson.
tau = 7
R = italy['nuovi_positivi'][tau-1:].values / italy['nuovi_positivi'][:-tau+1].values
sR = smoothed[tau-1:].values / smoothed[:-tau+1].values
ES = np.sqrt( 1/italy['nuovi_positivi'][tau-1:].values + 1/italy['nuovi_positivi'][:-tau+1].values )
sES = np.sqrt( 1/smoothed[tau-1:].values + 1/smoothed[:-tau+1].values )
fig, ax = plt.subplots(figsize=(15,7))
ax.plot(italy.index[:-tau+1], R, 'ok:', alpha=.5, ms=3, label="Raw $R_t$")
ax.fill_between(
italy.index[:-tau+1],
R-ES, R+ES, alpha=.2,
)
ax.plot(italy.index[:-tau+1], sR, 'r-', lw=2, zorder=10, label="Smoothed")
ax.fill_between(
italy.index[:-tau+1],
sR-sES, sR+sES, alpha=.4, label="S.E."
)
#ax.axhline(0, c="k", alpha=.7)
ax.axhline(1, c="k", alpha=.3)
ax.axvline(italy.index[0], c="k", ls=":", label=italy.index[0].date())
ax.fill_betweenx(
[0, max(R)],
italy.index[-tau], italy.index[-1],
color="g", alpha=.2,
label=fr"$\tau = {tau}$ days")
ax.axvline(italy.index[-1], c="r", ls=":", label=italy.index[-1].date())
ax.set_ylabel("$R_t$")
ax.set_xlabel("Data")
ax.set_ylim(0, 4)
fig.set_facecolor("w")
ax.legend()
ax.set_title(r"Italy $R_t$ as ODDS with serial interval $\tau$")
plt.plot();
_ = """r_t_range = np.linspace(-6, 6, R_T_MAX*100+1)
# Note that we're fixing sigma to a value just for the example
posteriors, log_likelihood = get_posteriors(smoothed, sigma=.25)
# Note that this takes a while to execute - it's not the most efficient algorithm
#hdis = highest_density_interval(posteriors)
HDIS = HDIs(posteriors)
most_likely = posteriors.idxmax().rename('ML')
RT = most_likely
for hdis in HDIS:
RT = pd.concat([RT, hdis], axis=1)
smoothed_RT = RT['ML'].rolling(8,
win_type='gaussian',
min_periods=1,
center=True).mean(std=2)
SIGMA_RATE = 2
total_posteriors = {}
day = smoothed_RT.index.size-1
pred_RT_x = np.linspace(day, day+6, 7)
pred_RT_d = pd.date_range(RT.index[day]+pd.Timedelta(days=1), RT.index[day]+pd.Timedelta(days=7))
#print(f"RT_x: {pred_RT_x}")
#print(f"RT_d: {pred_RT_d}")
RT_diff = smoothed_RT[day-8:day].diff()[1:]
pred_RT_y = smoothed_RT.values[day] + RT_diff.values
#print(RT_diff)
#print(pred_RT_y)
#print(f"RT_y: {pred_RT_y}")
#pred_RT_y[pred_RT_y<0] = 0
Rt_scenario_lo = pred_RT_y - .5
Rt_scenario_ML = pred_RT_y
Rt_scenario_hi = pred_RT_y + .5
posteriors = {
"lo": [],
"ML": [],
"hi": [],
}
k_range = np.linspace(1, 20000, 2000).astype(int)
prior0 = sps.norm(smoothed.values[day], 1).pdf(k_range)
prior0 /= prior0.sum()
Rt_scenarios = [
Rt_scenario_lo,
Rt_scenario_ML,
Rt_scenario_hi
]
for Rt_val, scenario in zip(Rt_scenarios, posteriors):
#print(f"scenario: {scenario}")
#lam0 = italy.values[-1] * np.exp(GAMMA * (Rt_value - 1))
#likelihood0 = sps.poisson.pmf(k_range, lam0)
#numerator0 = prior0 * likelihood0
#denominator0 = numerator0.sum()
#posterior0 = prior0
posteriors[scenario].append(prior0)
for i in range(7):
#print(f"pred {i}")
Po_ML = k_range[posteriors[scenario][-1].argmax()]
lam = Po_ML * np.exp(GAMMA * (Rt_val[i] - 1))
likelihood = sps.poisson.pmf(k_range, lam)
sigma = k_range[posteriors[scenario][-1].argmax()]/SIGMA_RATE
process_matrix = sps.norm(
loc=k_range,
scale=sigma
).pdf(k_range[:,None])
process_matrix /= process_matrix.sum(axis=0)
prior = posteriors[scenario][-1] @ process_matrix
numerator = prior * likelihood
denominator = numerator.sum()
posterior = numerator / denominator
posteriors[scenario].append(posterior)
#print(f"Po: {len(posteriors[scenario])}")
dates = pd.date_range(
italy.index[day]+pd.Timedelta(days=0),
italy.index[day]+pd.Timedelta(days=7),
)
res_lo = pd.DataFrame(index=dates, columns=["lo", "ML", "hi"])
res_ML = pd.DataFrame(index=dates, columns=["lo", "ML", "hi"])
res_hi = pd.DataFrame(index=dates, columns=["lo", "ML", "hi"])
p = .95
for i, _day in enumerate(posteriors["ML"]):
res = HDI_of_grid(_day, p)
lo_idx = res["indexes"][0]
hi_idx = res["indexes"][-1]
lo = k_range[lo_idx]
hi = k_range[hi_idx]
ML = k_range[_day.argmax()]
res_ML["lo"][dates[i]] = lo
res_ML["ML"][dates[i]] = ML
res_ML["hi"][dates[i]] = hi
for i, _day in enumerate(posteriors["lo"]):
res = HDI_of_grid(_day, p)
lo_idx = res["indexes"][0]
hi_idx = res["indexes"][-1]
lo = k_range[lo_idx]
hi = k_range[hi_idx]
ML = k_range[_day.argmax()]
res_lo["lo"][dates[i]] = lo
res_lo["ML"][dates[i]] = ML
res_lo["hi"][dates[i]] = hi
for i, _day in enumerate(posteriors["hi"]):
res = HDI_of_grid(_day, p)
lo_idx = res["indexes"][0]
hi_idx = res["indexes"][-1]
lo = k_range[lo_idx]
hi = k_range[hi_idx]
ML = k_range[_day.argmax()]
res_hi["lo"][dates[i]] = lo
res_hi["ML"][dates[i]] = ML
res_hi["hi"][dates[i]] = hi
# results
fig, ax = plt.subplots(figsize=(15,7))
ax.plot(original[day-14:day+8].index, original[day-14:day+8].values, 'o:', lw=1, c="k", label="Observed")
smoothed[day-14:day+8].plot(ax=ax, lw=2, c="k", label="Observed Smoothed")
ax.plot(
res_ML["ML"].index,
res_ML["ML"].values,
c="b", lw=3
)
ax.fill_between(
res_ML["ML"].index,
res_ML["lo"].values.astype(int),
res_ML["hi"].values.astype(int),
color="b", alpha=.3, label="Most Likely scenario"
)
ax.plot(
res_lo["ML"].index,
res_lo["ML"].values,
c="g", ls="--", lw=3, alpha=.5
)
ax.fill_between(
res_lo["ML"].index,
res_ML["lo"].values.astype(int),
res_lo["lo"].values.astype(int),
interpolate=True,
color="g", alpha=.2, label=r"Best scenario ($R_\tau - 0.5$)"
)
ax.plot(
res_hi["ML"].index,
res_hi["ML"].values,
c="r", ls="--", lw=3, alpha=.5
)
ax.fill_between(
res_hi["ML"].index,
res_ML["hi"].values.astype(int),
res_hi["hi"].values.astype(int),
interpolate=True,
color="r", alpha=.2, label=r"Worst scenario ($R_\tau + 0.5$)"
)
ax.axvline(original.index[day], c="k", ls="--")
ax.legend(loc="upper left")
ax.axhline(0, c="k", alpha=.3)
ax.set_ylabel("New Daily Cases")
ax.set_xlim(italy.index[day-14], res_hi.index[-1])
ax.yaxis.tick_right()
ax.xaxis.set_major_locator(mdates.WeekdayLocator())
ax.xaxis.set_major_formatter(mdates.DateFormatter('%d %b'))
ax.xaxis.set_minor_locator(mdates.DayLocator())
ax.set_title(f"COVID-19 New Cases Prediction {res_ML['ML'].index[0]} (given estimated $R_t$)")
plt.show();"""